package disruptor;

import com.lmax.disruptor.BatchRewindStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.RewindableEventHandler;
import com.lmax.disruptor.RewindableException;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import java.util.Arrays;

/**
 * A group of {@link EventProcessor}s used as part of the {@link Disruptor}.
 *
 * @param <T> the type of entry used by the event processors.
 */
public class EventHandlerGroup<T>
{
  private final Disruptor2<T> disruptor;
  private final ConsumerRepository2 consumerRepository;
  private final Sequence[] sequences;

  EventHandlerGroup(
      final Disruptor2<T> disruptor,
      final ConsumerRepository2 consumerRepository,
      final Sequence[] sequences)
  {
    this.disruptor = disruptor;
    this.consumerRepository = consumerRepository;
    this.sequences = Arrays.copyOf(sequences, sequences.length);
  }

  /**
   * Create a new event handler group that combines the consumers in this group with <code>otherHandlerGroup</code>.
   *
   * @param otherHandlerGroup the event handler group to combine.
   * @return a new EventHandlerGroup combining the existing and new consumers into a single dependency group.
   */
  public EventHandlerGroup<T> and(final EventHandlerGroup<T> otherHandlerGroup)
  {
    final Sequence[] combinedSequences = new Sequence[this.sequences.length + otherHandlerGroup.sequences.length];
    System.arraycopy(this.sequences, 0, combinedSequences, 0, this.sequences.length);
    System.arraycopy(
        otherHandlerGroup.sequences, 0,
        combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length);
    return new EventHandlerGroup<>(disruptor, consumerRepository, combinedSequences);
  }

  /**
   * Create a new event handler group that combines the handlers in this group with <code>processors</code>.
   *
   * @param processors the processors to combine.
   * @return a new EventHandlerGroup combining the existing and new processors into a single dependency group.
   */
  public EventHandlerGroup<T> and(final EventProcessor... processors)
  {
    Sequence[] combinedSequences = new Sequence[sequences.length + processors.length];

    for (int i = 0; i < processors.length; i++)
    {
      consumerRepository.add(processors[i]);
      combinedSequences[i] = processors[i].getSequence();
    }
    System.arraycopy(sequences, 0, combinedSequences, processors.length, sequences.length);

    return new EventHandlerGroup<>(disruptor, consumerRepository, combinedSequences);
  }

  /**
   * <p>Set up batch handlers to consume events from the ring buffer. These handlers will only process events
   * after every {@link EventProcessor} in this group has processed the event.</p>
   *
   * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must
   * process events before handler <code>B</code>:</p>
   *
   * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
   *
   * @param handlers the batch handlers that will process events.
   * @return a {@link com.lmax.disruptor.dsl.EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
   */
  @SafeVarargs
  public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
  {
    return handleEventsWith(handlers);
  }

  /**
   * <p>Set up batch handlers to consume events from the ring buffer. These handlers will only process events
   * after every {@link EventProcessor} in this group has processed the event.</p>
   *
   * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must
   * process events before handler <code>B</code>:</p>
   *
   * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
   *
   * @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}.
   * @param handlers            the rewindable event handlers that will process events.
   * @return a {@link com.lmax.disruptor.dsl.EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
   */
  @SafeVarargs
  public final EventHandlerGroup<T> then(final BatchRewindStrategy batchRewindStrategy,
      final RewindableEventHandler<? super T>... handlers)
  {
    return handleEventsWith(batchRewindStrategy, handlers);
  }

  /**
   * <p>Set up batch handlers to handle events from the ring buffer. These handlers will only process events
   * after every {@link EventProcessor} in this group has processed the event.</p>
   *
   * <p>This method is generally used as part of a chain. For example if <code>A</code> must
   * process events before <code>B</code>:</p>
   *
   * <pre><code>dw.after(A).handleEventsWith(B);</code></pre>
   *
   * @param handlers the batch handlers that will process events.
   * @return a {@link com.lmax.disruptor.dsl.EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
   */
  @SafeVarargs
  public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
  {
    return disruptor.createEventProcessors(sequences, handlers);
  }

  /**
   * <p>Set up batch handlers to handle events from the ring buffer. These handlers will only process events
   * after every {@link EventProcessor} in this group has processed the event.</p>
   *
   * <p>This method is generally used as part of a chain. For example if <code>A</code> must
   * process events before <code>B</code>:</p>
   *
   * <pre><code>dw.after(A).handleEventsWith(B);</code></pre>
   *
   * @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}.
   * @param handlers            the rewindable event handlers that will process events.
   * @return a {@link com.lmax.disruptor.dsl.EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
   */
  @SafeVarargs
  public final EventHandlerGroup<T> handleEventsWith(final BatchRewindStrategy batchRewindStrategy,
      final RewindableEventHandler<? super T>... handlers)
  {
    return disruptor.createEventProcessors(sequences, batchRewindStrategy, handlers);
  }


  /**
   * Create a dependency barrier for the processors in this group.
   * This allows custom event processors to have dependencies on
   * {@link com.lmax.disruptor.BatchEventProcessor}s created by the disruptor.
   *
   * @return a {@link SequenceBarrier} including all the processors in this group.
   */
  public SequenceBarrier asSequenceBarrier()
  {
    return disruptor.getRingBuffer().newBarrier(sequences);
  }
}

